Skip to main content

Spark Getting Started Guide

Setup

Setup

To use LakeSoul in Spark, first configure Spark Catalog. LakeSoul uses Apache Spark’s DataSourceV2 API for data source and catalog implementations. Moreover, LakeSoul provides scala table API to extend the capability of LakeSoul table.

Spark 3 Support Matrix

LakeSoulSpark Version
2.2.x-2.4.x3.3.x
2.0.x-2.1.x3.1.x

Spark Shell/SQL/PySpark

Run spark-shell/spark-sql/pyspark with the LakeSoulSparkSessionExtension sql extension.

spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-2.5.3.jar

Setup Maven Project

Include maven dependencies in your project:

<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-spark</artifactId>
<version>3.3-2.5.3</version>
</dependency>
// Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.lakesoul.LakeSoulOptions
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable


val builder = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
val spark = builder.getOrCreate()

Create Namespace

First, create a namespace for LakeSoul table, default namespace of LakeSoul Catalog is default.

CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace;
USE lakesoul_namespace;
SHOW TABLES;

Create Table

Create a partitioned LakeSoul table using SQL with the clause USING lakesoul, or using DataFrameWriterV2 API at the first save.

CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) 
USING lakesoul
PARTITIONED BY (`date`)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table';

Primary Key Table

In LakeSoul, a table with primary keys is defined as a hash-partitioned table. To create such a table, use the USING lakesoul clause and specify the TBLPROPERTIES setting, where 'hashPartitions' designates a comma-separated list of primary key column names, and 'hashBucketNum' determines the size or number of hash buckets.

CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) 
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table'
TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2');

CDC Table

Optionally, a hash-partitioned LakeSoul table has the capability to record Change Data Capture (CDC) data, enabling the tracking of data modifications. To create a LakeSoul table with CDC support, one can utilize the DDL statement for a hash-partitioned LakeSoul table and include an additional TBLPROPERTIES setting specifying the 'lakesoul_cdc_change_column' attribute. This attribute introduces an implicit column that assists the table in efficiently handling CDC information, ensuring precise tracking and management of data changes.

CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) 
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table'
TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op');

Insert/Merge Data

To append new data to a non-hash-partitioned table using Spark SQL, use INSERT INTO.

To append new data to a table using DataFrame, use DataFrameWriterV2 API. If this is the first write of the table, it will also auto-create the corresponding LakeSoul table.

INSERT INTO TABLE lakesoul_table VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-01'), (1, 'Cathy', '2024-01-02');

To append new data to a hash-partitioned table using Spark SQL, use Merge INTO.

To append new data to a hash-partitioned table using DataFrame, use LakeSoulTable upsert API.

CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , name, date)
AS SELECT 1L as `id`, 'George' as `name`, '2024-01-01' as `date`;


MERGE INTO lakesoul_hash_table AS t
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Update Data

LakeSoul tables can be updated by a DataFrame or using a standard UPDATE statement. To update data to a table using DataFrame, use LakeSoulTable updateExpr API.

UPDATE lakesoul_table SET name = 'David' WHERE id = 2;

Delete Data

LakeSoul tables can be removes the records by a DataFrame or using a standard DELETE statement. To delete data to a table using DataFrame, use LakeSoulTable delete API.

DELETE FROM lakesoul_table WHERE id =1;

Query Data

LakeSoul tables can be queried using a DataFrame or Spark SQL.

SELECT * FROM lakesoul_table;

Time Travel Query

LakeSoul supports time travel query to query the table at any point-in-time in history or the changed data between two commit time.

// Scala
val tablePath = "file:/tmp/lakesoul_namespace/cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
.toDF("range", "hash", "op")
.write
.mode("append")
.format("lakesoul")
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", "2")
.option("shortTableName", "cdc_table")
.option("lakesoul_cdc_change_column", "op")
.save(tablePath)
// record the version of 1st commit
import java.text.SimpleDateFormat

val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


val lakeTable = LakeSoulTable.forPath(tablePath)
lakeTable.upsert(Seq(("range1", "hash1-1", "delete"), ("range2", "hash2-10", "delete"))
.toDF("range", "hash", "op"))
// record the version of 2nd commit
val versionB: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)

lakeTable.upsert(Seq(("range1", "hash1-13", "insert"), ("range2", "hash2-13", "update"))
.toDF("range", "hash", "op"))
lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "update"))
.toDF("range", "hash", "op"))
// record the version of 3rd,4th commits
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


Complete Query

// Scala
spark.sql("SELECT * FROM cdc_table")

Snapshot Query

LakeSoul supports snapshot query for query the table at a point-in-time in history.

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range2")
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.SNAPSHOT_READ)
.load(tablePath)

Incremental Query

LakeSoul supports incremental query to obtain a set of records that changed between a start and end time.

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range1")
.option(LakeSoulOptions.READ_START_TIME, versionA)
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.INCREMENTAL_READ)
.load(tablePath)

Next steps

Next, you can learn more usage cases about LakeSoul tables in Spark at Spark API docs.